posix匿名信号量与互斥锁 示例生产者消费者问题

posix 信号量

system v信号量只能用于进程间同步,而posix信号量除了可以进程间同步,还可以线程间同步。system v信号量每次PV操作可以是N,但posix信号量每次PV只能是1。除此之外,posix信号量还有命名和匿名之分。

命名信号量

名字以/somename形式分辨,只能有一个/ ,且总长不能超过NAME_MAX - 4(一般是251)。
需要用sem_open函数创建或打开,PV操作分别是sem_wait和sem_post,可以使用sem_close 关闭,删除用sem_unlink。命名信号量用于不共享内存的进程间同步,类似system v信号量。

匿名信号量

存放在一块共享内存中。如果是线程共享,这块区域可以是全局变量;如果是进程共享,可以是system v共享内存(shmget创建,shmat映射),也可以是posix共享内存(shm_open创建,mmap 映射)。

匿名信号量必须用sem_init初始化,sem_init函数其中一个参数pshared决定了线程共享还是进程共享,也可以用sem_post和sem_wait进行操作,在共享内存释放前,匿名信号量要先用sem_destroy 销毁

互斥锁

Mutex用pthread_mutex_t类型的变量表示,pthread_mutex_init函数对Mutex做初始化,参数attr设定Mutex的属性,如果attr为NULL则表示缺省属性。建议开发中总是设置PTHREAD_MUTEX_RECURSIV属性,避免死锁。如果将互斥锁属性的类型设置为PTHREAD_MUTEX_RECURSIVE,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。

// 互斥量属性: 同一线程可多次加锁
pthread_mutexattr_t m_attr;
pthread_mutexattr_init(&m_attr);
pthread_mutexattr_settype(&m_attr, PTHREAD_MUTEX_RECURSIVE);
struct pthread_mutexattr_t
{
    enum lock_type    // 使用pthread_mutexattr_settype来更改
    {
         PTHREAD_MUTEX_TIMED_NP [default]//当一个线程加锁后,其余请求锁的线程形成等待队列,在解锁后按优先级获得锁。
         PTHREAD_MUTEX_ADAPTIVE_NP       // 动作最简单的锁类型,解锁后所有线程重新竞争。
         PTHREAD_MUTEX_RECURSIVE_NP      // 允许同一线程对同一锁成功获得多次。当然也要解锁多次。其余线程在解锁时重新竞争。
         PTHREAD_MUTEX_ERRORCHECK_NP     // 若同一线程请求同一锁,返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP动作相同。
    } type;
} attr;

用pthread_mutex_init函数初始化的Mutex可以用pthread_mutex_destroy销毁。如果Mutex变量是静态分配的(全局变量或static变量),也可以用宏定义PTHREAD_MUTEX_INITIALIZER来初始化,相当于用pthread_mutex_init初始化并且attr参数为NULL。

如果一个线程既想获得锁,又不想挂起等待,可以调用pthread_mutex_trylock,如果Mutex已经被另一个线程获得,这个函数会失败返回EBUSY,而不会使线程挂起等待。

生产者消费者问题

#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10

int g_buffer[BUFFSIZE];

unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0;
unsigned short consume_id = 0;

sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;

pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];

void *consume(void *arg)
{
    int i;
    int num = (int)arg;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);

        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == out)
                printf("\t<--consume");

            printf("\n");
        }
        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("%d end consume product %d\n", num, consume_id);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);
        sleep(1);
    }
    return NULL;
}

void *produce(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == in)
                printf("\t<--produce");

            printf("\n");
        }

        printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);
        sleep(5);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i = 0; i < BUFFSIZE; i++)
        g_buffer[i] = -1;

    sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);

    pthread_mutex_init(&g_mutex, NULL);

    for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)i);

    for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i);

    for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);

    return 0;
}

与System V信号量的程序相比,程序逻辑没太大变化,只是用pthread_mutex_lock替代了sem_mutex。

上述程序生产者消费者各一个线程,但生产者睡眠时间是消费者的5倍,故消费者会经常阻塞在sem_wait(&g_sem_empty)上面,因为缓冲区经常为空。可以将PRODUCTORS_COUNT改成5,即有5个生产者线程和1个消费者线程,而且生产者睡眠时间还是消费者的5倍,基本上可以保持动态平衡,即5个生产者一下子生产了5份东西,消费者1s消费1份,刚好在生产者继续生产前消费完。

自旋锁和读写锁简介

自旋锁

自旋锁类似于互斥锁,它的性能比互斥锁更高。自旋锁与互斥锁很重要的一个区别在于,线程在申请自旋锁的时候,线程不会被挂起,它处于忙等待的状态,一般用于等待时间比较短的情形

pthread_spin_init
pthread_spin_destroy
pthread_spin_lock
pthread_spin_unlock

读写锁

只要没有线程持有给定的读写锁用于写,那么任意数目的线程可以持有读写锁用于读。仅当没有线程持有某个给定的读写锁用于读或用于写时,才能分配读写锁用于写。读写锁用于读称为共享锁,读写锁用于写称为排它锁。

pthread_rwlock_init
pthread_rwlock_destroy
int pthread_rwlock_rdlock
int pthread_rwlock_wrlock
int pthread_rwlock_unlock